Java NIO - Netty 的核心原理和基础组件测试案例

Netty 的简介

Netty 是一款卓越的 Java NIO 客户端/服务器框架,专为构建高性能、高可扩展性且易于维护的网络程序而生。作为一个 异步事件驱动 的基础框架,它为开发者提供了一套强大且易用的工具集。

与直接使用 Java 原生 NIO 相比,Netty 极大地简化了网络编程的复杂度。它不仅优化了传统的 TCP 和 UDP 套接字开发,更显著降低了 HTTP Web 服务等高级应用场景的门槛。其核心设计目标之一便是 极致的开发体验。无论是自定义的 TCP/UDP 私有协议,还是 FTP、SMTP、HTTP 等标准应用层协议,开发者都能在 Netty 的支持下实现快速构建与平滑部署。通过对复杂 NIO 逻辑的封装,Netty 让代码逻辑更加清晰,有效缩短了从原型开发到生产上线的周期。

Netty 的另一大核心使命是 实现服务的高性能与高可扩展性。基于 Java NIO 的底层能力,Netty 打造了一套工业级的 Reactor 模式实现。通过对 Channel(通道)和 Handler(处理器)等基础类库的高效抽象,Netty 构建了一个灵活的事件处理链(Pipeline)。得益于这种精妙的设计,开发者可以针对不同的通信协议和业务逻辑进行无缝扩展,轻松应对高并发、海量连接的极端挑战。


Netty Hello world 案例

案例代码

依赖包:

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.2.12.Final</version><!--已改为最新-->
</dependency>

NettyDiscardServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/**
* @author KJ
* @description netty hello world 程序
*/
public class NettyDiscardServer {

// 服务引导类是一个组装和集成器,职责是将不同的Netty组件组装在一起。
ServerBootstrap b = new ServerBootstrap();
// 服务端口
private final int serverPort;

public NettyDiscardServer(int port) {
this.serverPort = port;
}

public void runServer() {
/**
* 创建反应器轮询组:
* 反应器组件的作用是进行IO事件的查询和分发。Netty中对应的反应器组件有多种,不同应用通信场景用到的反应器组件各不相同。
* 一般来说,对应于多线程的Java NIO通信的应用场景,Netty对应的反应器组件为 NioEventLoopGroup。
*
* 本案例使用了两个NioEventLoopGroup反应器组件实例:
* 第一个负责服务器通道新连接的IO事件的监听,可以形象地理解为“包工头”角色;第二个主要负责传输通道的IO事件的处理和数据传输,可以形象地理解为“工人”角色。
*
* 注意:在 Netty 5.0 的早期设计(以及部分 4.x 的后续演进计划)中,官方确实对线程模型进行了重构。
* NioEventLoopGroup 被标记为 @Deprecated,这通常意味着 Netty 正在推行统一的泛型化 I/O 模型。
* 官方不再希望开发者直接绑定到具体的底层实现(如 NIO 或 Epoll),而是通过 IoEventLoopGroup 配合 IoHandlerFactory 来创建。
* 这种变化的核心逻辑是 “解耦”。比如 NIO 切换到 Linux 性能更强的 io_uring,只需要更换 IoHandler 插件,而不需要更换整个线程池实现类。
*/
// EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
// EventLoopGroup workerLoopGroup = new NioEventLoopGroup(); // 默认的内部线程数量为最大可用的CPU处理器数量的2倍。
EventLoopGroup bossLoopGroup = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory());
EventLoopGroup workerLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());

try {
// 1.设置反应器轮询组
b.group(bossLoopGroup, workerLoopGroup)
.option(ChannelOption.SO_BACKLOG, 128) // option 作用于父通道
.childOption(ChannelOption.SO_KEEPALIVE, true) // childOption 作用于传输通道(即子通道 Worker)
.childOption(ChannelOption.TCP_NODELAY, true);
// 2.设置nio类型的通道
b.channel(NioServerSocketChannel.class);
// 3.设置监听端口
b.localAddress(serverPort);
// 4.装配子通道流水线(装配Pipeline流水线)
b.childHandler(new ChannelInitializer<SocketChannel>() { // 泛型需要和引导类中设置的传输通道类型对应起来。
// 通道初始化时会调用该方法。有连接到达时会创建一个通道的子通道,并初始化。
// 一般来说,initChannel()方法的大致业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。
protected void initChannel(SocketChannel ch) {
// 流水线的职责:负责管理通道中的处理器。
// 向子通道(传输通道)流水线添加一个处理器。
// Handler的作用是对就绪的IO事件,完成IO事件的业务处理。在Reactor模式中,所有的业务处理都在Handler中完成,业务处理一般需要自己编写。
ch.pipeline().addLast(new NettyDiscardHandler());
}
});
// 5.开始绑定服务器新连接的监听端口。这里通过调用sync同步方法阻塞直到绑定成功
// 在Netty中,所有的IO操作都是异步执行的,这就意味着任何一个IO操作都会立即返回,返回时异步任务还没有真正执行。
// 什么时候执行完成呢?Netty中的IO操作都会返回异步任务实例(如channelFuture实例)。
// 该实例既可以实现同步阻塞一直到channelFuture异步任务执行完成,也可以通过为其增加事件监听器的方式注册异步回调逻辑,以获得Netty中的IO操作的真正结果。
// Future异步回调或者同步阻塞,涉及高并发的核心模式——异步回调模式,是高并发开发非常重要的基础性知识。
ChannelFuture channelFuture = b.bind().sync();
System.out.println("服务器启动成功,监听端口: " + channelFuture.channel().localAddress());

// 6.阻塞当前线程直到通道关闭,可以调用通道的closeFuture()方法,以获取通道关闭的异步任务。
// 当通道被关闭时,closeFuture 实例的 sync() 方法会返回。
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7.优雅关闭EventLoopGroup,释放掉所有资源,包括创建的反应器线程
// 关闭反应器轮询组,同时会关闭内部的子反应器线程,也会关闭内部的选择器、内部的轮询线程以及负责查询的所有子通道。
// 在子通道关闭后,会释放掉底层的资源,如Socket文件描述符等。
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}

/**
* Netty的Handler需要处理多种IO事件(如读就绪、写就绪),对应于不同的IO事件,Netty提供了一些基础方法。
* 这些方法都已经提前封装好,应用程序直接继承或者实现即可。比如说,对于处理入站的IO事件,其对应的接口为
* ChannelInboundHandler,并且Netty提供了ChannelInboundHandlerAdapter适配器作为入站处理器的默认实现。
*
* Netty中的出/入站与Java NIO中的出/入站有些微妙的不同,Netty的出站可以理解为从Handler传递到Channel的操作,
* 比如说write写通道、read读通道数据;Netty的入站可以理解为从Channel传递到Handler的操作,比如说Channel数据
* 过来之后,会触发Handler的channelRead()入站处理方法。
*
* Netty的ByteBuf缓冲区组件可以对应到前面介绍的Java NIO类库的数据缓冲区Buffer组件。
* 只不过相对而言,Netty的ByteBuf缓冲区性能更好,使用也更加方便。
*/
static class NettyDiscardHandler extends ChannelInboundHandlerAdapter { // 或者直接 extends SimpleChannelInboundHandler<String>
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
/*System.out.println("收到消息:");
while (in.isReadable()) {
System.out.print((char) in.readByte());
}
System.out.println();
*/
System.out.println("收到消息: " + in.toString(CharsetUtil.UTF_8));
} finally {
ReferenceCountUtil.release(msg);
}
}
}

public static void main(String[] args) {
new NettyDiscardServer(9000).runServer();
}
}

可以使用nc充当客户端进行测试:

1
$ nc 127.0.0.1 9000


ChannelOption 介绍

上述案例中,无论是对于 NioServerSocketChannel 父通道类型还是对于 NioSocketChannel 子通道类型,都可以设置一系列的 ChannelOption(通道选项)。ChannelOption 类中定义了一系列选项,常见如下:

  • SO_RCVBUFSO_SNDBUF:TCP传输选项,每个 TCP socket(套接字)在内核中都有一个发送缓冲区和一个接收缓冲区,这两个选项就是用来设置TCP连接 的两个缓冲区大小的。TCP的全双工工作模式以及TCP的滑动窗口对两 个独立的缓冲区都有依赖。
  • TCP_NODELAY:TCP传输选项,用于开启或关闭 Nagle 算法。默认的 true 表示关闭(有数据发送时就马上发送);如果要减少发送次数、减少网络交互,就设置为false,等累积一定大小的数据后再发送。。
  • SO_KEEPALIVE:此为TCP传输选项,表示是否开启TCP的心跳机制。true为连接保 持心跳,默认值为false。启用该功能时,TCP会主动探测空闲连接的 有效性。需要注意的是:默认的心跳间隔是7200秒,即2小时。Netty 默认关闭该功能。
  • SO_REUSEADDR:此为TCP传输选项,为true时表示地址复用,默认值为false。有 四种情况需要用到这个参数设置:
    • 当有一个地址和端口相同的连接socket1处于TIME_WAIT状态时,而又希望启动一个新的连接socket2要占用该地址和端口。
    • 有多块网卡或用 IP Alias 技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同。
    • 同一进程绑定相同的端口到多个socket(套接字)上,但每个 socket 绑定的IP地址不同。
    • 完全相同的地址和端口的重复绑定,但这只用于UDP的多播,不用于TCP。
  • SO_LINGER:此为TCP传输选项,可以用来控制socket.close()方法被调用后的行为,包括延迟关闭时间。
    • 默认的 -1表示禁用该功能,socket.close() 方法在调用后立即返回,但操作系统底层会将发送缓冲区的数据全部发送到对端;
    • 如果此选项设置为0,就表示 socket.close()方法在调用后会立即返回,但是操作系统会放弃发送缓冲区数据,直接向对端发送RST包,对端将收到复位错误;
    • 如果此选项设置为非0整数值,就表示调用socket.close()方法的线程被阻塞, 直到延迟时间到来,发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。
  • SO_BACKLOG:此为TCP传输选项,表示服务端接收连接的队列长度,如果队列已满,客户端连接将被拒绝。服务端在处理客户端新连接请求时(三次握手)是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端到来的时候,服务端将不能处理的客户端连接请求放在队列中 等待处理。具体来说,服务端对完成第二次握手的连接放在一个队列(暂时称A队列),如果进一步完成第三次握手,再把连接从A队列移动到新队列(暂时称B队列),接下来应用程序会通过调用accept()方法取出握手成功的连接,而系统则会将该连接从B队列移除。A和B队列的长度之和是SO_BACKLOG指定的值,当A和B队列的长度之和大于SO_BACKLOG 值时,新连接将会被TCP内核拒绝。所以,如果SO_BACKLOG过小, accept速度可能会跟不上,A和B队列全满,导致新客户端无法连接。SO_BACKLOG对程序支持的连接数并无影响,影响的只是还没有 被accept取出的连接数,也就是三次握手的排队连接数。如果连接建立频繁,服务器处理新连接较慢,那么可以适当调大 这个参数。
  • SO_BROADCAST:此为TCP传输选项,表示设置为广播模式。


Netty 中的 Channel

常见的 Channel

通道是Netty的核心概念之一,代表网络连接,由它负责同对端进 行网络通信,既可以写入数据到对端,也可以从对端读取数据。Netty中不直接使用Java NIO的Channel组件,对Channel组件进行了自己的封装。对于每一种通信连接协议,Netty都实现了自己的通道。对应到不同的协议,每一种协议基本上都有NIO和OIO两个版本。常见NIO通道类型如下:

  • NioSocketChannel:异步非阻塞TCP Socket传输通道。
  • NioServerSocketChannel:异步非阻塞TCP Socket服务端监听 通道。
  • NioDatagramChannel:异步非阻塞的UDP传输通道。
  • NioSctpChannel:异步非阻塞Sctp传输通道。
  • NioSctpServerChannel:异步非阻塞Sctp服务端监听通道。

对应的常见阻塞通道有:

  • OioSocketChannel:同步阻塞式TCP Socket传输通道。
  • OioServerSocketChannel:同步阻塞式TCP Socket服务端监听 通道。
  • OioDatagramChannel:同步阻塞式UDP传输通道。
  • OioSctpChannel:同步阻塞式Sctp传输通道。
  • OioSctpServerChannel:同步阻塞式Sctp服务端监听通道。

一般来说,服务端编程用到最多的通信协议还是TCP,对应的 Netty传输通道类型为NioSocketChannel类、Netty服务器监听通道类 型为NioServerSocketChannel。不论是哪种通道类型,在主要的API和使用方式上和NioSocketChannel类基本都是相同的,更多是底层的传输协议不同,而Netty帮大家极大地屏蔽了传输差异。如果没有特殊情况,本书的很多案例都将以NioSocketChannel通道为主。


Channel 的源码

几乎所有的Netty通道实现类都继承了 AbstractChannel 抽象类, 都拥 parent 和 pipeline 两个属性成员。Netty通道的抽象类 AbstractChannel 的构造函数如下:

1
2
3
4
5
6
protected AbstractChannel(Channel parent) {
this.parent = parent; // 内部父通道属性,对于连接监听通道其父通道为null
id = newId();
unsafe = newUnsafe(); // 新建一个底层的NIO 通道,完成实际的IO操作
pipeline = newChannelPipeline(); // 内部流水线属性,新建一条通道流水线
}


Channel 的重要方法

通道接口中所定义的几个重要方法:

  • ChannelFuture connect(SocketAddress address):此方法的作用为连接远程服务器。方法的参数为远程服务器的地 址,调用后会立即返回,其返回值为执行连接操作的异步任务 ChannelFuture。此方法在 客户端 的传输通道使用。
  • ChannelFuture bind(SocketAddress address):此方法的作用为绑定监听地址,开始监听新的客户端连接。此方 法在 服务器 的新连接监听和接收通道时调用。
  • ChannelFuture close():此方法的作用为关闭通道连接,返回连接关闭的ChannelFuture异 步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任 务设置回调方法;或者调用ChannelFuture异步任务的sync()方法来阻 塞当前线程,一直等到通道关闭的异步任务执行完毕。
  • Channel read():此方法的作用为读取通道数据,并且启动入站处理。具体来说, 从内部的Java NIO Channel通道读取数据,然后启动内部的Pipeline 流水线,开启数据读取的入站处理。此方法的返回通道自身用于链式调用。
  • ChannelFuture write(Object o):此方法的作用为启程出站流水处理,把处理后的最终数据写到底层通道(如Java NIO通道)。此方法的返回值为出站处理的异步处理任务。
  • Channel flush():此方法的作用为将缓冲区中的数据立即写出到对端。调用前面的 write() 出站处理时,并不能将数据直接写出到对端,write操作的作 用在大部分情况下仅仅是写入操作系统的缓冲区,操作系统会根据缓 冲区的情况决定什么时候把数据写到对端。执行flush()方法会立即将 缓冲区的数据写到对端。


Netty 中的 Reactor 组件

在Reactor模式中,一个反应器(或者SubReactor子反应器)会由 一个事件处理线程负责事件查询和分发。该线程不断进行轮询,通过 Selector 选择器不断查询注册过的IO事件(选择键)。如果查询到IO 事件,就分发给Handler业务处理器。

Netty中的反应器组件有多个实现类,这些实现类与其通道类型相互匹配。对应于 NioSocketChannel通道,Netty的反应器类为NioEventLoop(NIO事件轮询),它有两个重要的成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性,与之前讲的反应器实现在思路上是一致的:一个NioEventLoop拥有一个线程,负责一个 Java NIO选择器的IO事件轮询。

在Netty中,EventLoop反应器和Channel的关系是什么呢?理论上 来说,一个EventLoop反应器和NettyChannel通道是一对多的关系:一个反应器可以注册成千上万的通道。


Netty 中的 Handler

Handler 简介

在Netty中,EventLoop反应器内部有一个线程负责Java NIO选择器的事件的轮询,然后进行对应的事件分发。事件分发(Dispatch) 的目标就是Netty的Handler(含用户定义的业务处理器)。Netty 的 Handler 分为两大类:第一类是ChannelInboundHandler入 站处理器;第二类是ChannelOutboundHandler出站处理器,二者都继 承了ChannelHandler处理器接口。

无论是入站还是出站,Netty都提供了各自的默认适配器实现:ChannelInboundHandler的默认实现为 ChannelInboundHandlerAdapter(入站处理适配器)。 ChannelOutboundHandler的默认实现为 ChannelOutBoundHandlerAdapter(出站处理适配器)。这两个默认的通道处理适配器分别实现了基本的入站操作和出站操作功能。如果要实现自己的业务处理器,不需要从零开始去实现处理器的接口,只需要继承通道处理适配器即可。


两类 Handler 的核心方法

对于 ChannelInboundHandler 的核心方法,大致介绍如下:

  • channelRegistered:当通道注册完成后,Netty会调用fireChannelRegistered()方 法,触发通道注册事件,而在通道流水线注册过的入站处理器的 channelRegistered()回调方法会被调用。
  • channelActive:当通道激活完成后,Netty会调用fireChannelActive()方法,触 发通道激活事件,而在通道流水线注册过的入站处理器的 channelActive()回调方法会被调用。
  • channelRead:当通道缓冲区可读时,Netty会调用fireChannelRead()方法,触 发通道可读事件,而在通道流水线注册过的入站处理器的 channelRead()回调方法会被调用,以便完成入站数据的读取和处理。
  • channelReadComplete:当通道缓冲区读完时,Netty会调用fireChannelReadComplete() 方法,触发通道缓冲区读完事件,而在通道流水线注册过的入站处理 器的channelReadComplete()回调方法会被调用。
  • channelInactive:当连接被断开或者不可用时,Netty会调用 fireChannelInactive()方法,触发连接不可用事件,而在通道流水线注册过的入站处理器的channelInactive()回调方法会被调用。
  • exceptionCaught:当通道处理过程发生异常时,Netty会调用 fireExceptionCaught()方法,触发异常捕获事件,而在通道流水线注 册过的入站处理器的exceptionCaught()方法会被调用。注意,这个方法是在ChannelHandler 中定义的方法,入站处理器、出站处理器接口 都继承了该方法。

ChannelOutboundHandler 接口定义了大部分的出站操作。Netty出站处理的方向是通过上层Netty通道去操底层Java IO通道,主要出站(Outbound)的操作如下:

  • bind:监听地址(IP+端口)绑定:完成底层Java IO通道的IP地址绑 定。如果使用TCP传输协议,这个方法用于服务端。
  • connect:连接服务端:完成底层Java IO通道的服务端的连接操作。如果使 用TCP传输协议,那么这个方法将用于客户端。
  • write:写数据到底层:完成Netty通道向底层Java IO通道的数据写入操 作。此方法仅仅是触发一下操作,并不是完成实际的数据写入操作。
  • flush:将底层缓存区的数据腾空,立即写出到对端。
  • read:从底层读数据:完成Netty通道从Java IO通道的数据读取。
  • disConnect:断开服务器连接:断开底层Java IO通道的socket连接。如果使用 TCP传输协议,此方法主要用于客户端。
  • close:主动关闭通道:关闭底层的通道,例如服务端的新连接监听通道。


Handler 生命周期测试

下面是一个 ChannelInboundHandler 的生命周期的测试案例:

DemoInHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class DemoInHandler extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("被调用:handlerAdded()");
super.handlerAdded(ctx);
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("被调用:channelRegistered()");
super.channelRegistered(ctx);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("被调用:channelActive()");
super.channelActive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object
msg) throws Exception {
System.out.println("被调用:channelRead()");
super.channelRead(ctx, msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("被调用:channelReadComplete()");
super.channelReadComplete(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("被调用:channelInactive()");
super.channelInactive(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("被调用: channelUnregistered()");
super.channelUnregistered(ctx);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("被调用:handlerRemoved()");
super.handlerRemoved(ctx);
}
}

DemoInHandlerTest:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 关于 EmbeddedChannel:
* 在实际开发中,底层通信传输的基础工作Netty已经替大家完成。实际上,更多的工作是设计和开发ChannelHandler业务处理。
* 处理器开发完成后,需要投入单元测试。一般单元测试的大致流程是:先将Handler业务处理器加入到通道的Pipeline流水线中,
* 接下来先后启动Netty服务器、客户端程序,相互发送消息,测试业务处理器的效果。这些复杂的工序存在一个问题:如果每开发
* 一个业务处理器都进行服务器和客户端的重复启动,那么整个的过程是非常烦琐和浪费时间的。为了解决这种徒劳、低效的重复工作,
* Netty提供了一个专用通道,即EmbeddedChannel(嵌入式通道)。
*
* EmbeddedChannel仅仅是模拟入站与出站的操作,底层不进行实际传输,不需要启动Netty服务器和客户端。除了不进行传输之外,
* EmbeddedChannel的其他事件机制和处理流程和真正的传输通道是一模一样的。因此,使用EmbeddedChannel,开发人员可以在
* 单元测试用例中方便、快速地进行ChannelHandler业务处理器的单元测试。其中重要的方法如下:
* (1)writeInbound:模拟底层的入站包,从而被入站处理器处理到。
* (2)writeOutbound:向模拟通道写入一个出站数据(如二进制ByteBuf数据包),该包将进入处理器流水线,被待测试的出站处理器所处理。
*/
public class DemoInHandlerTest {
@Test
public void testInHandlerLifeCircle() {
final DemoInHandler inHandler = new DemoInHandler();

// 初始化处理器
ChannelInitializer i =
new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(inHandler);
}
};
// 创建嵌入式通道
EmbeddedChannel channel = new EmbeddedChannel(i);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1);
// 模拟入站,向嵌入式通道写一个入站数据包
channel.writeInbound(buf);
channel.flush();
// 模拟入站,再写一个入站数据包
channel.writeInbound(buf);
channel.flush();
// 通道关闭
channel.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
被调用:handlerAdded()          # 当业务处理器被加入到流水线后,此方法将被回调。
被调用:channelRegistered() # 当通道成功绑定一个NioEventLoop反应器后,此方法将被回调。
被调用:channelActive() # 当通道激活成功后,此方法将被回调。

被调用:channelRead() # 有数据包入站,通道可读。流水线会启动入站处理流程,从前向后,入站处理器的channelRead()方法会被依次回调到。
被调用:channelReadComplete() # 流水线完成入站处理后,会从前向后依次回调每个入站处理器的channelReadComplete()方法,表示数据读取完毕。
被调用:channelRead()
被调用:channelReadComplete()

被调用:channelInactive() # 当通道的底层连接已经不是ESTABLISH状态或者底层连接已经关闭时,会首先回调所有业务处理器的 channelInactive 方法
被调用: channelUnregistered() # 通道和NioEventLoop反应器解除绑定,移除掉对这条通道的事件处理之后,回调所有业务处理器的 channelUnregistered 方法
被调用:handlerRemoved() # 移除掉通道上所有的业务处理器,并且回调所有业务处理器的 handlerRemoved 方法


Netty 中的 Pipeline

Pipeline 的由来

我们先来梳理一下Netty 的 Reactor 模式实现中各个组件之间的关系:

  • 反应器(或者SubReactor子反应器)和通道之间是一对多的 关系:一个反应器可以查询很多个通道的IO事件。
  • 通道和Handler处理器实例之间是多对多的关系:一个通道的IO事件可以被多个Handler实例处理;一个Handler处理器实例也能绑定到很多通道,处理多个通道的IO事件。

问题是:通道和Handler处理器实例之间的绑定关系,Netty是如何组织的呢?Netty设计了一个特殊的组件,叫作ChannelPipeline(通道流水线)。它像一条管道,将绑定到一个通道的多个Handler处理器实例串联在一起,形成一条流水线。ChannelPipeline的默认实现实际上被设计成一个双向链表。所有的Handler处理器实例被包装成双向链表的节 点,被加入到 ChannelPipeline 中。


InPipeline 和 OutPipeline

ChannelPipeline 是基于 责任链设计模式 来设计的。入站处理时,每一个来自通道的IO事件都会进入一次 ChannelPipeline。在进入第一个Handler处理器后,这个IO事件将按照既定的从前往后次序,在流水线上不断地向后流动,流向下一个 Handler处理器。在向后流动的过程中,会出现3种情况:

  • 如果后面还有其他Handler入站处理器,那么IO事件可以交 给下一个Handler处理器向后流动。
  • 如果后面没有其他的入站处理器,就意味着这个IO事件在此 次流水线中的处理结束了。
  • 如果在中间需要终止流动,可以选择不将IO事件交给下一个 Handler处理器,流水线的执行也被终止了。

Netty的通道流水线与普通的流水线不同,Netty的流水线不是单向的,而是双向的,而普通的流水线基本都是单向的。Netty是这样规 定的:入站处理器的执行次序是从前到后,出站处理器的执行次序是从后到前。总之,IO事件在流水线上的执行次序与IO事件的类型是有关系的。

除了流动的方向与IO操作类型有关之外,流动过程中所经过的处 理器类型也是与IO操作的类型有关的。入站的IO操作只能从Inbound入 站处理器类型的Handler流过;出站的IO操作只能从Outbound出站处理 器类型的Handler流过。

在了解完流水线之后,大家应该对Netty中的通道、 EventLoop反应器、处理器,以及三者之间的协作关系,有了一个清晰 的认知和了解,基本可以动手开发简单的Netty程序了。为了方便开发 者,Netty提供了一系列辅助类,用于把上面的三个组件快速组装起来 完成一个Netty应用,这个系列的类叫作引导类。服务端的引导类叫作 ServerBootstrap 类,客户端的引导类叫作 Bootstrap 类。


绑定和执行顺序测试

下面我们来写一个流水线绑定和执行顺序的测试:

InPipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class InPipeline {
public static class SimpleInHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("入站处理器A: 被回调");
// 如果将这一行注释掉(即不执行 ctx.fireChannelRead),那么 B 和 C 根本就不会被调用到!(可以实现流水线的截断)
super.channelRead(ctx, msg);
}
}

public static class SimpleInHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("入站处理器B: 被回调");
super.channelRead(ctx, msg);
}
}

public static class SimpleInHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("入站处理器C: 被回调");
super.channelRead(ctx, msg);
}
}
}

OutPipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class OutPipeline {
public static class SimpleOutHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("出站处理器A: 被回调");
super.write(ctx, msg, promise);
}
}

public static class SimpleOutHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("出站处理器B: 被回调");
super.write(ctx, msg, promise);
}
}

public static class SimpleOutHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("出站处理器C: 被回调");
// 如果将这一行注释掉(即不执行 ctx.write),那么 B 和 A 根本就不会被调用到!(可以实现流水线的截断)
super.write(ctx, msg, promise);
}
}
}

PipelineInBoundTest:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class PipelineInBoundTest {
@Test
public void testPipelineInBound() {
ChannelInitializer channelInit =
new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
// in:从前往后执行
ch.pipeline().addLast(new InPipeline.SimpleInHandlerA());
ch.pipeline().addLast(new InPipeline.SimpleInHandlerB());
ch.pipeline().addLast(new InPipeline.SimpleInHandlerC());

// out:从后往前执行
ch.pipeline().addLast(new OutPipeline.SimpleOutHandlerA());
ch.pipeline().addLast(new OutPipeline.SimpleOutHandlerB());
ch.pipeline().addLast(new OutPipeline.SimpleOutHandlerC());
}
};
EmbeddedChannel channel = new EmbeddedChannel(channelInit);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1);
channel.writeInbound(buf); // 向通道写一个入站报文(数据包)
System.out.println();
channel.writeOutbound(buf); // 向通道写一个出站报文(或数据包)
}
}
1
2
3
4
5
6
7
入站处理器A: 被回调
入站处理器B: 被回调
入站处理器C: 被回调

出站处理器C: 被回调
出站处理器B: 被回调
出站处理器A: 被回调


ChannelHandlerContext

通道处理器上下文简介

在Netty的设计中Handler是无状态的,不保存和Channel有关的信息。Handler的目标是将自己的处理逻辑做得很通用,可以给不同的 Channel使用。与Handler不同的是,Pipeline是有状态的,保存了 Channel的关系。于是,Handler和Pipeline之间需要一个中间角色将 它们联系起来。这个中间角色就是 ChannelHandlerContext(通道处理器上下文)。

不管我们定义的是哪种类型的业务处理器,最终它们都是以双向 链表的方式保存在流水线中。这里流水线的节点类型并不是前面的业 务处理器基类,而是其包装类型ChannelHandlerContext类。当业务处 理器被添加到流水线中时会为其专门创建一个ChannelHandlerContext 实例,主要封装了ChannelHandler(通道处理器)和 ChannelPipeline(通道流水线)之间的关联关系。所以,流水线 ChannelPipeline中的双向链接实质是一个由ChannelHandlerContext 组成的双向链表。作为Context的成员,无状态的Handler关联在 ChannelHandlerContext中。

总结一下Channel、Handler、ChannelHandlerContext三者的关系:Channel拥有一条ChannelPipeline,每一个流水线节点为一个 ChannelHandlerContext上下文对象,每一个上下文中包裹了一个 ChannelHandler。在ChannelHandler的入站/出站处理方法中,Netty 会传递一个Context实例作为实际参数。处理器中的回调代码可以通过 Context实参,在业务处理过程中去获取ChannelPipeline实例或者 Channel实例。


上下文方法执行的特点

ChannelHandlerContext中包含了许多方法,可分为两类: 第一类是获取上下文所关联的Netty组件实例,如所关联的通道、所关 联的流水线、上下文内部Handler业务处理器实例等;第二类是入站和出站处理方法。

在Channel、ChannelPipeline、ChannelHandlerContext三个类 中,都存在同样的出站和入站处理方法,这些出现在不同的类中的相 同方法,功能有何不同呢?如果通过Channel或ChannelPipeline的实例来调用这些出站和入 站处理方法,它们就会在整条流水线中传播。如果是通过 ChannelHandlerContext 调用出站和入站处理方法,就只会从当前的节点开始往同类型的下一站处理器传播,而不是在整条流水线从头至尾 进行完整的传播。


Head 与 Tail Context

通道流水线在没有加入任何处理器之前装配了两个默认的处理器 上下文:一个头部上下文HeadContext,一个尾部上下文 TailContext。pipeline的创建、初始化除了保存一些必要的属性外, 核心就在于创建了HeadContext头节点和TailContext尾节点。每个流水线中双向链表结构从一开始就存在了HeadContext和 TailContext两个节点,后面添加的处理器上下文节点都添加在 HeadContext实例和TailContext实例之间。

流水线尾部的TailContext不仅仅是一个上下文类,还是一个入站 处理器类,实现了所有入站处理回调方法,这些回调实现的主要工作 基本上都是有关收尾处理的,如释放缓冲区对象、完成异常处理等。流水线头部的 HeadContext 比TailContext 复杂得多,既是一个出站处理器,也是一个入站处理器,还保存了一个 unsafe(完成实际通道传输的类)实例,也就是 HeadContext 还需要负责最终的通道传输工作。


入站出站双向链接操作

流水线的一个入站(读)操作和一个出站(写)操作,源码大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head; // HeadContext
final AbstractChannelHandlerContext tail; // TailContext

// 出站:启动流水线的出站写
@Override
public ChannelFuture write(Object msg) {
return tail.write(msg); // 从后往前传递
}

// 入站:启动流水线的入站读
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg); // 从头往后传递
return this;
}
// ...
}

完整的出站和入站处理流转过程都是通过调用流水线实例的相应 出/入站方法开启的。先看看入站处理的流转过程,以流水线的入站读的启动过程为例,从以上源码可以看出,流水线的入站流程是从 fireXXX()方法开始的(XXX表示具体入站操作,入站读的操作为 ChannelRead)。在fireChannelRead的源码中,从流水线的头节点 Head 开始,将入站的msg数据沿着流水线上的入站处理器逐个向后传递。如果所有的入站处理过程都没有截断流水线的处理,则该入站数据 msg(如ByteBuffer缓冲区)将一直传递到流水线的末尾,也就是 TailContext 处理器。

同理,流水线的出站流程是从流水线的尾部节点Tail 开始的,将出站的msg数据沿着流水线上的出站处理器逐个向前传递。出站msg数据在经过所有出站处理器之后,将一直传递到流水线的头部,也就是 HeadContext 处理器,并且通过unsafe传输实例将二进制 数据写入底层传输通道,完成整个传输处理过程。


截断流水线的方法

在入站/出站的过程中,如果由于业务条件不满足而需要截断流水 线的处理,不让处理传播到下一站,那么该怎么办呢?实际上我们在前面 绑定和执行顺序测试 已经做了测试,只要执行父类的 super.channelXXX 或者 super.writeXXX 即可实现所谓的流水线截断。

流水线的出站处理传播流程如何截断呢?结论是:出站处理流程只要开始执行,就不能被截断,强行截断的话Netty会抛出异常。如果业务条件不满足,可以不启动出站处理。


在流水线热插拔Handler

Netty 中的处理器流水线是一个双向链表。在程序执行过程中,可 以动态进行业务处理器的热插拔:动态地增加、删除流水线上的业务处理器。主要的Handler热拔插方法声明在ChannelPipeline接口中, 具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package io.netty.channel;
// ...

public interface ChannelPipeline extends Iterable<Entry<String, ChannelHandler>> {
//...

// 在流水线头部增加一个业务处理器,名字由name指定
ChannelPipeline addFirst(String name, ChannelHandler handler);
// 在流水线尾部增加一个业务处理器,名字由name指定
ChannelPipeline addLast(String name, ChannelHandler handler);

// 在baseName处理器的前面增加一个业务处理器,名字由name指定
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
// 在baseName处理器的后面增加一个业务处理器,名字由name指定
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

// 删除一个业务处理器实例
ChannelPipeline remove(ChannelHandler handler);
// 删除一个处理器实例
ChannelHandler remove(String handler);

// 删除第一个业务处理器
ChannelHandler removeFirst();
// 删除最后一个业务处理器
ChannelHandler removeLast();

//...
}

下面是一个简单的示例:调用流水线实例的 remove(ChannelHandler)方法,从流水线动态地删除一个 Handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.jupiter.api.Test;

public class PipelineHotOperateTest {
static class SimpleInHandlerA extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("入站处理器A: 被回调");
super.channelRead(ctx, msg);
// 从流水线删除当前业务处理器
ctx.pipeline().remove(this);
}
}

static class SimpleInHandlerB extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("入站处理器B: 被回调");
super.channelRead(ctx, msg);
}
}

static class SimpleInHandlerC extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("入站处理器C: 被回调");
super.channelRead(ctx, msg);
}
}

@Test
public void testPipelineHotOperating() {
ChannelInitializer channelInit = new
ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(new SimpleInHandlerA());
ch.pipeline().addLast(new SimpleInHandlerB());
ch.pipeline().addLast(new SimpleInHandlerC());
}
};
EmbeddedChannel channel = new EmbeddedChannel(channelInit);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1);

// 第一次向通道写入站报文(或数据包)
channel.writeInbound(buf);
System.out.println();

// 第二次向通道写入站报文(或数据包)
channel.writeInbound(buf);
System.out.println();

// 第三次向通道写入站报文(或数据包)
channel.writeInbound(buf);
System.out.println();
}
}
1
2
3
4
5
6
7
8
9
入站处理器A: 被回调
入站处理器B: 被回调
入站处理器C: 被回调

入站处理器B: 被回调
入站处理器C: 被回调

入站处理器B: 被回调
入站处理器C: 被回调

这里为大家分析一下通道初始化处理器 ChannelInitializer 没有 被重复调用的原因。通过翻看源码可知,在注册完成 channelRegistered 回调方法中调用 ctx.pipeline().remove(this) 将自己从流水线中删除了,所以该处理器仅仅被执行了一次。有关 ChannelInitializer 的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package io.netty.channel;
//...

public abstract class ChannelInitializer extends ChannelInboundHandlerAdapter {
//...

// 通道初始化,抽象方法,需要子类实现
protected abstract void initChannel(Channel channel) throws Exception;

// 回调方法:加入通道(注册完成)后触发
public final void channelRegistered(ChannelHandlerContext ctx){
// 调用通道初始化实现
this.initChannel(ctx.channel());
// 删除通道初始化处理器
ctx.pipeline().remove(this);
// 发送注册消息到下一站
ctx.fireChannelRegistered();
}

//...
}

ChannelInitializer在完成了通道的初始化之后,为什么要将自 己从流水线中删除呢?原因很简单,就是一条通道流水线只需要做一次装配工作即可。